package com.fwmagic.dynamic_rule.service.impl;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.RuleAtomicParam;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.UserActionCountQueryService;
import com.fwmagic.dynamic_rule.utils.RuleCalcUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ListState;

import java.util.*;

/**
 * 查询用户行为次数条件查询服务，在Flink的State中查询次数
 */
@Slf4j
public class UserActionCountQueryServiceStateImpl implements UserActionCountQueryService {

    private ListState<LogBean> listState;

    public UserActionCountQueryServiceStateImpl(ListState<LogBean> listState) {
        this.listState = listState;
    }

    public UserActionCountQueryServiceStateImpl() {
    }


    /**
     * 查询规则参数对象中，要求的用户行为次数类条件是否满足
     * 同时将查询到的真实次数，set回规则参数中
     *
     * @param ruleParam
     * @return 条件是否满足
     */
    @Override
    public boolean queryActionCounts(String deviceId, RuleParam ruleParam) throws Exception {
        Iterable<LogBean> logBeans = listState.get();
        List<RuleAtomicParam> userActionCountParam = ruleParam.getUserActionCountParam();

        //匹配事件和属性，并设置事件匹配的次数
        queryActionCountsHelper(logBeans, userActionCountParam);

        //判断事件次数是否匹配
        for (RuleAtomicParam ruleAtomicParam : userActionCountParam) {
            if (ruleAtomicParam.getRealCnt() < ruleAtomicParam.getCnt()) {
                return false;
            }
        }
        return true;
    }

    /**
     * 事件匹配规则，并记录匹配到的事件的次数
     *
     * @param logBeans
     * @param userActionCountParam
     */
    public void queryActionCountsHelper(Iterable<LogBean> logBeans, List<RuleAtomicParam> userActionCountParam) {
        for (LogBean logBean : logBeans) {
            for (RuleAtomicParam ruleAtomicParam : userActionCountParam) {
                boolean isMatch = RuleCalcUtils.eventBeanParamMatchRuleParam(logBean, ruleAtomicParam, true);
                if (isMatch) {
                    ruleAtomicParam.setRealCnt(ruleAtomicParam.getRealCnt() + 1);
                    log.debug("用户:{},查询了State的事件:{},查询到的次数是:{}", logBean.getDeviceId(), ruleAtomicParam.getEventId(), ruleAtomicParam.getRealCnt());
                }
            }
        }
    }

    @Override
    public boolean queryActionCounts(String deviceId, RuleAtomicParam ruleAtomicParam) throws Exception {
        Iterable<LogBean> beans = listState.get();
        for (LogBean logBean : beans) {
            boolean isMatch = RuleCalcUtils.eventBeanParamMatchRuleParam(logBean, ruleAtomicParam, true);
            if (isMatch) {
                ruleAtomicParam.setRealCnt(ruleAtomicParam.getRealCnt() + 1);
            }
        }
        return ruleAtomicParam.getCnt() == ruleAtomicParam.getRealCnt();
    }

}
